[feature](RoutineLoad) Support the Amazon Kinesis#61325
[feature](RoutineLoad) Support the Amazon Kinesis#613250AyanamiRei wants to merge 58 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
|
/review |
There was a problem hiding this comment.
I found 3 correctness issues in this PR.
-
be/src/load/stream_load/stream_load_executor.cpp: the Kinesis error path does not resetctx->kinesis_info->cmt_sequence_numberwhen plan execution fails.KinesisDataConsumerGroup::start_all()has already copied the last consumed sequence numbers into the context before the fragment/txn result is known, so a failed attempt can leave advanced progress in memory for the retried task. Kafka explicitly rewinds here; Kinesis needs the same protection to avoid skipping records after an aborted batch. -
fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java: cloud mode is not blocked, but the cloud routine-load transaction path is still Kafka-only.RoutineLoadManagercan create Kinesis jobs in cloud mode,KinesisTaskInfosetscloudCluster, and thenTxnUtil.rlTaskTxnCommitAttachmentToPb()still castsattachment.getProgress()toKafkaProgress. The first cloud Kinesis commit will therefore fail with aClassCastExceptioninstead of persisting progress. -
regression-test/conf/regression-conf.groovy: the newload_p0/kinesis_routine_loaddirectory is added toexcludeDirectories, so the entire new regression suite is skipped by default. That leaves the feature effectively untested in CI even though the PR adds many Kinesis cases.
Critical checkpoint conclusions:
- Goal of the task / correctness / proof: The PR clearly aims to add end-to-end Kinesis routine-load support, but the current code does not fully achieve that because retry safety and cloud-mode behavior are still broken. Tests were added, but the regression config currently excludes them.
- Small / clear / focused: Not fully. This is a broad cross-layer feature (FE, BE, thrift/proto, persistence, tests), so the risk is naturally high and missing parity with existing Kafka paths matters.
- Concurrency: The feature introduces new consumer-group and concurrent progress-tracking paths. I did not find a primary lock-order bug in the reviewed hunks, but the retry/reset issue shows the lifecycle between consumer progress and transaction failure is not yet safe.
- Lifecycle / initialization: No static initialization issue identified in the reviewed code.
- Configuration items: New Kinesis properties are added. I did not validate dynamic-config behavior because these are routine-load job properties rather than mutable process configs.
- Compatibility changes: Yes. New thrift/proto enums and structs are introduced. The most concrete compatibility/runtime gap I found is the cloud transaction attachment path remaining Kafka-specific.
- Parallel code paths: Yes. Kafka already resets progress on failure and has a complete cloud progress path; Kinesis needs the same treatment.
- Special conditional checks: The explicit
Config.isCloudMode()branches in Kinesis code currently lead to incomplete behavior and should either be implemented fully or rejected early. - Test coverage: Functional tests were added, including restart/error scenarios, but they are excluded in
regression-conf.groovy, so effective coverage is currently missing. - Observability: Basic logs are present; no major observability blocker found in the reviewed paths.
- Transaction / persistence: Yes, this feature touches transaction commit attachments and persisted progress. The cloud attachment serialization path is currently incorrect for Kinesis.
- Data write / modification: Yes. Retry safety is not guaranteed because failed attempts can retain advanced Kinesis progress.
- FE/BE variable passing: The new FE/BE Kinesis progress structures are mostly wired, but cloud-specific passing is incomplete.
- Performance: I did not identify a primary performance regression worth blocking on relative to the correctness issues above.
- Other issues: No additional blocker beyond the three findings above.
Because of the issues above, I cannot consider the current implementation correct yet.
…rganization Fix test file imports to use new package paths: - kafka.KafkaProgress, kafka.KafkaRoutineLoadJob, kafka.KafkaTaskInfo - kinesis.KinesisProgress, kinesis.KinesisRoutineLoadJob This fixes compilation errors in test files after moving Kafka and Kinesis classes to their respective subdirectories.
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Fix FE compilation failures introduced by routine load package migration (kafka/kinesis subpackages) by aligning method visibility for cross-package overrides and补齐 missing imports in routine load source and related FE tests.
### Release note
None
### Check List (For Author)
- Test: Manual test
- Manual test: FE build `FE_MAVEN_THREADS=16 FE_MAVEN_RETRY_THREADS=4 DISABLE_BUILD_UI=ON DISABLE_BUILD_HIVE_UDF=ON DISABLE_BE_JAVA_EXTENSIONS=ON DISABLE_JAVA_CHECK_STYLE=ON MAVEN_OPTS='-Xms1g -Xmx4g -XX:+UseG1GC -XX:ActiveProcessorCount=16' MVN_OPT='-U -s /tmp/doris-maven-settings-doris.xml' bash build.sh --fe -j 16` (BUILD SUCCESS)
- Behavior changed: No (compile and test-compile compatibility fix)
- Does this need documentation: No
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: FE checkstyle failed with import ordering, unused import, and redundant same-package import errors in routine load and related tests.
### Release note
None
### Check List (For Author)
- Test: Manual test
- Manual test (user run): ./build.sh --fe -j16
- Behavior changed: No
- Does this need documentation: No
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
|
run buildall |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Kinesis conf relied on indirect precompiled-header includes for Status and Doris logging macros, which can fail in non-PCH builds.
### Release note
None
### Check List (For Author)
- Test: Manual test
- Header self-contained syntax check for kinesis_conf.h
- Non-PCH syntax check for kinesis_conf.cpp
- Behavior changed: No
- Does this need documentation: No
|
run buildall |
|
run buildall |
FE Regression Coverage ReportIncrement line coverage |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
/review |
|
@sollhui Except for two Docker cases, all other tests have passed locally. |
|
PR approved by anyone and no changes requested. |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: this pr should merge after #62184
Problem Summary:
support the Amazon Kinesis for routine load.
compare AWS Kinesis with Kafka:
Kinesis Stream <=> Kafka Topic
shards <=> partition
doc pr:apache/doris-website#3521
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)